Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support large schema discovery #17394

Merged
merged 7 commits into from
Oct 5, 2022
Merged

Conversation

mfsiega-airbyte
Copy link
Contributor

@mfsiega-airbyte mfsiega-airbyte commented Sep 29, 2022

What

Support large catalog (aka schema) discovery.

How

Today, the discover schema job passes the discovered schema back as a returned value. Then the API handler persists the schema and returns it (code ref: https://github.com/airbytehq/airbyte/blob/master/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java#L210). For large schemas, this runs into a Temporal limit which won't pass such a large message.

Instead, we will persist the schema in the discover schema job and just pass back the id. Then the handler will use the id to retrieve it out of the db and return it.

Recommended reading order

  1. airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml, modifies the output interface from the discover schema job.
  2. airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml, airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml modifies the input interface to the discover schema job, including some bits of info that are only necessary to persist alongside the catalog. This is probably not ideal, but could perhaps be cleaned up after the fix is merged.
  3. airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java for changes in the API handler
  4. airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java where we actually persist the catalog
  5. Everything else

Tests

Updated unit tests to exercise the new behavior.

For manual e2e testing, used make-big-schema.sh to make a table with 15k tables. Verified that this failed under the existing behaviour and succeeded with this PR.

@github-actions github-actions bot added area/platform issues related to the platform area/server area/worker Related to worker labels Sep 29, 2022
@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets September 29, 2022 14:37 Inactive
@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets September 29, 2022 14:41 Inactive
@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets September 29, 2022 15:12 Inactive
Copy link
Contributor

@pmossman pmossman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfsiega-airbyte I gave this a first pass! I don't have a lot of hands-on experience with the Discover workflow, so I'm gonna tag in @davinchia to give this a review as well.

I think the approach looks good to me. Left a few suggestions around types, naming, etc.

For my own understanding, which parts of this were blocked on removing the catalog from the response of the connections list endpoint? I wanted to double check that blocker and make sure we're in the clear

description: Connector version
type: string
configHash:
description: Config hash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise, examples are always nice for generic 'string' type properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't exactly sure what an example of a hash should look like :) but I did put in some extra explanation?

return discoverJobToOutput(response);
}

private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse<AirbyteCatalog> response) {
private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse<String> response) throws ConfigNotFoundException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this method is actually fetching from the database, I think a rename is appropriate so that it doesn't sound like it's doing a simple in-memory conversion operation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

@@ -257,16 +251,19 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So
final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceCreate.getSourceDefinitionId())
.withConfiguration(partialConfig);
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
final SynchronousResponse<String> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely feels like we've lost some readability with the change from AirbyteCatalog to String here. If the ID were a UUID, it'd be pretty clear, ie SynchronousResponse<UUID> but as a reader it feels like String could be anything. If this is actually a UUID, maybe we can find a way to type it as such so these response types are clear?

Copy link
Contributor

@davinchia davinchia Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a UUID, so we should be able to type it as such. I agree we should make clear this refers to a database id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attempted to make this a bit clearer (on top of the UUID change indeed).

.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withSourceId(source.getSourceId().toString())
.withConfigHash(HASH_FUNCTION.hashBytes(Jsons.serialize(source.getConfiguration()).getBytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like the kind of thing that should maybe go into a helper or util class, and we can add some javadoc to explain why and how we're hashing the config. Otherwise I'd worry that this starts getting copy/pasted elsewhere and becomes hard to change if we ever need to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a good call - Davin opened https://github.com/airbytehq/airbyte/issues/17488.

@pmossman pmossman requested a review from davinchia September 30, 2022 00:47

return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG).withDiscoverCatalog(catalog.get());
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(catalog.get(), UUID.fromString(discoverSchemaInput.getSourceId()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved to an api call in the future. Can we quickly write up an issue so we don't forget?

@@ -38,6 +41,8 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class);

private static final HashFunction HASH_FUNCTION = Hashing.md5();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: there are now multiple instances of this floating about. We should centralise this. This can happen in a follow up PR. It definitely needs to happen otherwise we can bump heads against a tricky cache invalidation issues.

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mfsiega-airbyte , very exciting!

Most of my comments are in the same vein as Parker's. One major comment about centralising the hashing function definition. It doesn't have to happen in this PR but should definitely happen relatively soon after to prevent any cache invalidation errors.

I'm asking Malik to clarify here the hashing differences here: https://airbytehq-team.slack.com/archives/C03A8CSANPQ/p1664572102698309

Will review again after you make changes.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 3, 2022

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:

  • source-elasticsearch
  • source-cockroachdb-strict-encrypt
  • source-clickhouse
  • source-cockroachdb
  • source-mssql-strict-encrypt
  • source-mysql-strict-encrypt
  • source-postgres
  • source-alloydb
  • source-redshift
  • source-clickhouse-strict-encrypt
  • source-bigquery
  • source-alloydb-strict-encrypt
  • source-mssql
  • source-scaffold-java-jdbc
  • source-snowflake
  • source-sftp
  • source-db2
  • source-e2e-test
  • source-e2e-test-cloud
  • source-db2-strict-encrypt
  • source-postgres-strict-encrypt
  • source-jdbc
  • source-tidb
  • source-mongodb-v2
  • source-mongodb-strict-encrypt
  • source-oracle
  • source-mysql
  • source-oracle-strict-encrypt
  • source-kafka

@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets October 3, 2022 19:52 Inactive
@github-actions
Copy link
Contributor

github-actions bot commented Oct 3, 2022

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:

  • source-postgres
  • source-scaffold-java-jdbc
  • source-mysql-strict-encrypt
  • source-postgres-strict-encrypt
  • source-alloydb
  • source-alloydb-strict-encrypt
  • source-mysql
  • source-mssql-strict-encrypt
  • source-e2e-test
  • source-redshift
  • source-elasticsearch
  • source-e2e-test-cloud
  • source-snowflake
  • source-bigquery
  • source-tidb
  • source-db2-strict-encrypt
  • source-clickhouse-strict-encrypt
  • source-clickhouse
  • source-db2
  • source-kafka
  • source-oracle
  • source-mongodb-v2
  • source-cockroachdb-strict-encrypt
  • source-cockroachdb
  • source-oracle-strict-encrypt
  • source-sftp
  • source-mongodb-strict-encrypt
  • source-jdbc
  • source-mssql

@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets October 3, 2022 20:13 Inactive
@@ -73,7 +78,10 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration());

final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(fullConfig);
.withConnectionConfiguration(fullConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to version this in Temporal @pmossman ? Or is it ok to not do so because this is an activity impl and not a workflow?

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfsiega-airbyte this looks good to me.

One open question from me on versioning to make sure we don't cause errors. We can address the hashing in a follow up PR.

Lmk if there are parts you want to me to look at closer.

@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets October 5, 2022 15:34 Inactive
@mfsiega-airbyte
Copy link
Contributor Author

Re workflow versioning: once #17562 is merged, any schema discovery workflows that are underway during a deployment will fail and need to be retried (rather than getting stuck).

Since this is relatively easy to recover from, planning to go ahead rather than specifically handling versioning (as soon as the linked PR is merged).

@github-actions
Copy link
Contributor

github-actions bot commented Oct 5, 2022

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:

  • source-cockroachdb-strict-encrypt
  • source-e2e-test
  • source-db2
  • source-mssql
  • source-clickhouse
  • source-elasticsearch
  • source-oracle-strict-encrypt
  • source-postgres
  • source-scaffold-java-jdbc
  • source-cockroachdb
  • source-mysql
  • source-tidb
  • source-bigquery
  • source-mssql-strict-encrypt
  • source-clickhouse-strict-encrypt
  • source-kafka
  • source-redshift
  • source-oracle
  • source-alloydb-strict-encrypt
  • source-alloydb
  • source-mongodb-v2
  • source-jdbc
  • source-mongodb-strict-encrypt
  • source-mysql-strict-encrypt
  • source-e2e-test-cloud
  • source-postgres-strict-encrypt
  • source-snowflake
  • source-sftp
  • source-db2-strict-encrypt

@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets October 5, 2022 17:01 Inactive
@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets October 5, 2022 17:05 Inactive
@mfsiega-airbyte mfsiega-airbyte temporarily deployed to more-secrets October 5, 2022 17:07 Inactive
@mfsiega-airbyte mfsiega-airbyte merged commit 84f0bac into master Oct 5, 2022
@mfsiega-airbyte mfsiega-airbyte deleted the msiega/large-catalogs2 branch October 5, 2022 18:33
@suhomud
Copy link
Contributor

suhomud commented Oct 6, 2022

@mfsiega-airbyte integration tests for mysql-source are broken https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534

@mfsiega-airbyte
Copy link
Contributor Author

@suhomud I can take a look but it might take some time, so if you have any more specific error/etc that points to that PR I can probably track it down more quickly.

@suhomud
Copy link
Contributor

suhomud commented Oct 6, 2022

@mfsiega-airbyte looking as well. What I see as for now:
Tests are failed where connector discovery involved for example:

MySqlSourceAcceptanceTest > testDiscover() FAILED
[10182](https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534#step:11:10183)
    io.***.workers.exception.WorkerException: Error while discovering schema
Caused by:
[10274](https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534#step:11:10275)
        java.lang.NullPointerException: Cannot invoke "String.length()" because "name" is null
[10275](https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534#step:11:10276)
            at java.base/java.util.UUID.fromString(UUID.java:237)
[10276](https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534#step:11:10277)
            at io.***.workers.general.DefaultDiscoverCatalogWorker.run(DefaultDiscoverCatalogWorker.java:90)
[10277](https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534#step:11:10278)
            ... 88 more

I assume not only mysql is affected

@mfsiega-airbyte
Copy link
Contributor Author

Yeah indeed - these tests were never passing a sourceId which is a bit wrong since the field is required, but it wasn't actually being relied-upon before.

I'll send a PR to fix shortly.

@mfsiega-airbyte
Copy link
Contributor Author

@suhomud PTAL: #17662

@suhomud
Copy link
Contributor

suhomud commented Oct 6, 2022

@mfsiega-airbyte thanks for quick fix! I run integration test on the #17662
Let's see if it fixed the issue

@mfsiega-airbyte
Copy link
Contributor Author

Looks like the tests passed and I merged, lmk if there are still issues!

letiescanciano added a commit that referenced this pull request Oct 6, 2022
…vation

* master: (26 commits)
  supply a source id for schema discovery in connector integration tests (#17662)
  Source Iterable: Add permission check for stream (#17602)
  Moving TrackingClientSingleton.initialize into the bean itself (#17631)
  Handle null workspace IDs in tracking/reporting methods gracefully (#17641)
  Bump Airbyte version from 0.40.11 to 0.40.12 (#17653)
  Revert "Do not wait the end of a reset to return an update (#17591)" (#17640)
  Standardize HttpRequester's url_base and path format (#17524)
  Create geography_type enum and add geography column in connection and workspace table (#16818)
  airbyte-cron: update connector definitions from remote (#16438)
  Do not wait the end of a reset to return an update (#17591)
  Remove redundant title labels from connector specs (#17544)
  Updated GA4 status
  support large schema discovery (#17394)
  🪟 🐛 Fixes connector checks not properly ending their loading state (#17620)
  🪟🧪 [Experiment] add hideOnboarding experiment (#17605)
  Source Recharge: change releaseStage to GA (#17606)
  Source Recharge: skip stream if 403 received (#17608)
  remove sonar-scan workflow (#17609)
  Mark/tables should be full width on all pages (#17401)
  Auto fail all workfow if there is a Versioning issue (#17562)
  ...
jhammarstedt pushed a commit to jhammarstedt/airbyte that referenced this pull request Oct 31, 2022
* support large schema discovery

* update generic source tests to handle new approach to schema discovery

* readability improvements related to schema discovery and large schema support

* update internal ScheduleHandler method name

* update source tests per new schema discovery interface
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/server area/worker Related to worker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants